remove disposables from scheduler worker#632
Conversation
WalkthroughWalkthroughThe changes involve significant modifications across multiple files in the Changes
Sequence Diagram(s)sequenceDiagram
participant Scheduler
participant Worker
participant Observer
Scheduler->>Worker: Schedule Task
Worker->>Observer: Notify Task Completion
Observer->>Scheduler: Acknowledge Completion
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 0
Outside diff range, codebase verification and nitpick comments (1)
src/rpp/rpp/operators/subscribe_on.hpp (1)
50-54: Potential Impact on Observer Management Due to Removed Disposable HandlingThe recent changes in
src/rpp/rpp/operators/subscribe_on.hpphave removed the conditional check and logic related to disposable handling in thesubscribemethod. This could affect how observers are managed, particularly in scenarios where disposables are expected to be handled. Thesubscribe_on_schedulablestruct also lacks disposable management, which might lead to unintended behavior. Consider reviewing these changes to ensure that observer setup and management align with the intended design.
- File:
src/rpp/rpp/operators/subscribe_on.hpp- Lines: 50-54
Analysis chain
Verify the impact of the change on the observer setup and management, and disposable handling.
The change removes the conditional check that previously verified if the worker's disposable was not "none disposable" and the associated logic for setting up the observer's upstream with the disposable.
The
subscribemethod now directly schedules thesubscribe_on_schedulableoperation without any prior checks on the worker's disposable state.This change may impact how observers are set up and managed, potentially leading to different behavior in scenarios where disposables are expected to be handled.
Run the following script to verify the impact of the change:
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the impact of the change on the observer setup and management, and disposable handling. # Test 1: Search for the usage of `subscribe` method. Expect: No occurrences of the old conditional logic and disposable handling. rg --type cpp -A 10 $'void subscribe\(' # Test 2: Search for the usage of `subscribe_on_schedulable`. Expect: No occurrences of disposable handling. rg --type cpp -A 10 $'subscribe_on_schedulable'Length of output: 24740
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (16)
- src/extensions/rppqt/rppqt/schedulers/main_thread.hpp (1 hunks)
- src/rpp/rpp/operators/debounce.hpp (2 hunks)
- src/rpp/rpp/operators/delay.hpp (2 hunks)
- src/rpp/rpp/operators/subscribe_on.hpp (1 hunks)
- src/rpp/rpp/operators/timeout.hpp (1 hunks)
- src/rpp/rpp/schedulers/current_thread.hpp (1 hunks)
- src/rpp/rpp/schedulers/details/worker.hpp (1 hunks)
- src/rpp/rpp/schedulers/fwd.hpp (2 hunks)
- src/rpp/rpp/schedulers/immediate.hpp (1 hunks)
- src/rpp/rpp/schedulers/new_thread.hpp (4 hunks)
- src/rpp/rpp/schedulers/run_loop.hpp (1 hunks)
- src/rpp/rpp/schedulers/test_scheduler.hpp (5 hunks)
- src/rpp/rpp/schedulers/thread_pool.hpp (1 hunks)
- src/rpp/rpp/sources/from.hpp (2 hunks)
- src/rpp/rpp/sources/interval.hpp (2 hunks)
- src/tests/rpp/test_scheduler.cpp (4 hunks)
Additional comments not posted (39)
src/rpp/rpp/schedulers/details/worker.hpp (2)
Line range hint
1-1: Verify the impact of removing theget_disposablefunction.The
get_disposablefunction, which returned adisposable_wrapperbased on the type ofStrategy, has been entirely removed. This change suggests a simplification of the class's interface, potentially indicating that the disposable functionality is no longer necessary or has been refactored elsewhere.Run the following script to verify the impact of the removal:
Line range hint
1-1: Verify the impact of removing theis_none_disposableconstant.The
is_none_disposableconstant, which determined if theStrategytype was non-disposable, has been entirely removed. This change further reinforces the simplification of the class by reducing its public interface and possibly altering how disposable resources are managed within the scheduling strategy.Run the following script to verify the impact of the removal:
Verification successful
Removal of
is_none_disposableConstant VerifiedThe
is_none_disposableconstant was removed as part of a broader change related to disposables, as indicated by the commit message "remove disposables." There are no references or dependencies on this constant elsewhere in the codebase, confirming that its removal does not impact other parts of the codebase. The change simplifies the class by reducing its public interface.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the impact of removing the `is_none_disposable` constant. # Test: Search for the usage of the removed constant. Expect: No occurrences. rg --type cpp $'is_none_disposable'Length of output: 34
Script:
#!/bin/bash # Description: Search for comments or documentation mentioning `is_none_disposable` and check for related changes. # Search for comments or documentation mentioning `is_none_disposable`. rg --type cpp --type md --type txt 'is_none_disposable' # Check the git history for any commits related to the removal of `is_none_disposable`. git log -S 'is_none_disposable' --onelineLength of output: 162
src/rpp/rpp/operators/subscribe_on.hpp (1)
45-45: Verify the impact of the change on the subscription mechanism and disposable management.The change simplifies the
updated_disposable_strategytype alias by directly assigningPrevto it, removing the conditional logic that previously determined the value based on the worker's state.This change may alter the expected behavior of the subscription mechanism, particularly in relation to disposable management.
Run the following script to verify the impact of the change:
Verification successful
Verification successful: No issues found with the change in
updated_disposable_strategy.The change simplifies the
updated_disposable_strategytype alias by directly assigningPrev, which is consistent with its usage in other parts of the codebase. There is no evidence of the old conditional logic affecting the subscription mechanism or disposable management.
- The change aligns with similar patterns in other files.
- No occurrences of the old conditional logic were found in the
subscribemethod.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the impact of the change on the subscription mechanism and disposable management. # Test 1: Search for the usage of `updated_disposable_strategy`. Expect: No occurrences of the old conditional logic. rg --type cpp -A 5 $'updated_disposable_strategy' # Test 2: Search for the usage of `subscribe` method. Expect: No occurrences of the old conditional logic. rg --type cpp -A 10 $'void subscribe\('Length of output: 41537
src/rpp/rpp/schedulers/immediate.hpp (1)
Line range hint
1-1: LGTM: The removal of theget_disposable()function is approved.The removal of the
get_disposable()function is consistent with the PR objective of removing disposables from the scheduler worker. It indicates a simplification of the interface and a change in how disposables are managed within the scheduling context.src/rpp/rpp/schedulers/thread_pool.hpp (1)
54-54: LGTM!The code changes are approved.
src/rpp/rpp/schedulers/test_scheduler.hpp (11)
25-25: LGTM!The removal of inheritance from
rpp::details::base_disposablesimplifies thestatestruct and aligns with the PR objective.
52-52: LGTM!Using
std::maxto ensure the scheduled time point is not earlier than the current time is a good practice to maintain the integrity of the scheduling logic.
53-53: LGTM!Using
schedulings.back()as the time point for scheduling the task maintains consistency between theschedulingsvector and the actual scheduled tasks.
66-66: LGTM!Using a
std::weak_ptr<state>in theworker_strategyconstructor aligns with the removal of disposable functionality and helps prevent potential memory leaks or dangling pointers.
76-76: LGTM!Removing the disposal check before scheduling tasks aligns with the changes made in the
statestruct and simplifies the scheduling logic, consistent with the PR objective.
77-77: LGTM!Calling the
drainmethod after scheduling a task ensures that any tasks scheduled at or before the current time are executed immediately, maintaining the expected behavior of the scheduler.
84-84: LGTM!Using a
std::weak_ptr<state>for them_statemember variable in theworker_strategyclass aligns with the removal of disposable functionality and helps prevent potential memory leaks or dangling pointers, consistent with the constructor modification.
91-91: LGTM!Using
m_statedirectly in thecreate_workermethod reflects the new ownership semantics and simplifies the code, consistent with the overall shift towards using shared and weak pointers for managing the lifecycle of thestateobject.
94-94: LGTM!Directly dereferencing
m_statein theget_schedulingsmethod simplifies the code and enhances readability, consistent with the shift towards using shared pointers for managing thestateobject.
95-95: LGTM!Directly dereferencing
m_statein theget_executionsmethod simplifies the code and enhances readability, consistent with the shift towards using shared pointers for managing thestateobject.
106-106: LGTM!Using a
std::shared_ptr<state>for them_statemember variable in thetest_schedulerclass aligns with the removal of disposable functionality and simplifies the code, consistent with the overall shift towards using shared and weak pointers for managing the lifecycle of thestateobject.src/extensions/rppqt/rppqt/schedulers/main_thread.hpp (1)
Line range hint
1-1: LGTM: The removal ofget_disposable()simplifies the public interface.The removal of the
get_disposable()function from the public interface ofmain_thread_schedulerseems to be a valid simplification of the interface. The change does not appear to affect the core scheduling functionality, which is handled by theworker_strategyclass.src/rpp/rpp/sources/interval.hpp (2)
Line range hint
38-42: LGTM!The code changes are approved. Removing the conditional block simplifies the subscription process and may potentially improve performance by eliminating unnecessary checks.
32-32: Verify the impact of simplifying theexpected_disposable_strategytype alias.The change removes the conditional logic that checked the worker's disposability and directly assigns the type alias to
rpp::details::observables::bool_disposable_strategy_selector. This simplifies the code but may impact how observers are managed in the context of scheduling.Run the following script to verify the usage of
expected_disposable_strategyandbool_disposable_strategy_selectorin the codebase:Verification successful
Verified: Simplification of
expected_disposable_strategyaligns with design choices.The change to use
rpp::details::observables::bool_disposable_strategy_selectorininterval_strategyis consistent with similar usages across the codebase. This suggests a deliberate design choice to standardize disposable strategies in these contexts, minimizing the impact on the overall behavior. The change is verified and aligns with the broader design strategy.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the usage of `expected_disposable_strategy` and `bool_disposable_strategy_selector`. # Test 1: Search for the usage of `expected_disposable_strategy`. # Expect: Only occurrences in the context of `interval_strategy`. rg --type cpp -A 5 $'expected_disposable_strategy' # Test 2: Search for the usage of `bool_disposable_strategy_selector`. # Expect: Occurrences in the context of `interval_strategy` and other relevant places. rg --type cpp -A 5 $'bool_disposable_strategy_selector'Length of output: 25542
src/rpp/rpp/schedulers/run_loop.hpp (1)
Line range hint
1-1: LGTM: The removal of theget_disposable()method simplifies the interface.The
get_disposable()method has been removed from theworker_strategyclass. This change simplifies the interface by eliminating a potentially redundant or unnecessary method, as the concept of a disposable resource may no longer be relevant or needed in this context. The remaining code does not seem to be affected by this removal.src/rpp/rpp/schedulers/new_thread.hpp (12)
32-32: LGTM!The change from
disposabletostate_tis approved as it aligns with the new approach to thread lifecycle management.
35-35: LGTM!The default constructor for
state_tis approved.
Line range hint
37-52: LGTM!The destructor of
state_tis approved as it correctly handles the thread lifecycle based on the newis_stopingflag.
44-44: LGTM!The change from
is_disposedtois_stopingis approved as it aligns with the new approach to thread lifecycle management.
54-60: LGTM!Moving the
defer_tomethod into thestate_tclass is approved as it improves the encapsulation of task queuing within the state.
62-67: LGTM!Introducing the
queue_datastruct is approved as it enhances clarity by separating the concerns of task management and state handling.
69-69: LGTM!Updating the
data_threadmethod to use the newqueue_datastructure is approved as it ensures that the thread operates on the correct state representation.
76-77: LGTM!Updating the condition check to use the new
is_stopingflag is approved as it ensures that the thread exits correctly based on the updated state management logic.
79-79: LGTM!Updating the condition check to use the new
is_stopingflag is approved as it ensures that the thread waits correctly based on the updated state management logic.
123-123: LGTM!Updating the member variable to use the new
queue_datastructure is approved as it ensures that the state is correctly managed within the class.
139-140: LGTM!Updating the
defer_tomethod to use the newstate_tclass is approved as it ensures that the tasks are correctly queued within the worker strategy.
145-145: LGTM!Updating the member variable to use the new
state_tclass is approved as it ensures that the state is correctly managed within the worker strategy.src/rpp/rpp/schedulers/fwd.hpp (1)
147-149: LGTM! The changes simplify thestrategyconcept and align with the PR objective.The changes to the
strategyconcept in therpp::schedulers::constraintnamespace are approved for the following reasons:
The removal of the requirement related to the
get_disposable()method is consistent with the PR objective of removing disposables from the scheduler worker. This simplifies the constraints on thestrategyconcept.The modification of the
requiresclause to remove the parameters indicates a shift in the expected conditions for the concept. This change reduces the strictness of thestrategyconcept, potentially allowing a broader range of types to satisfy it.The overall effect of these changes is a simplification of the
strategyconcept, which aligns with the goal of the PR.src/rpp/rpp/operators/debounce.hpp (1)
170-170: Simplifiedcontainertype alias.The change simplifies the
containertype alias by directly usingDisposableStrategy::disposable_container, removing the conditional logic based on the worker's disposability. This aligns with the overall refactoring goal of simplifying the logic related to disposable workers.How does this change impact the instantiation and utilization of the disposable container within the
debounceoperator? Please provide more context on the implications of this simplification.src/rpp/rpp/sources/from.hpp (1)
87-87: LGTM!The change to
expected_disposable_strategytype alias aligns with the PR objective of removing disposables from the scheduler worker. It simplifies the logic by always usingrpp::details::observables::bool_disposable_strategy_selector.src/rpp/rpp/operators/timeout.hpp (2)
136-138: LGTM!The changes are approved:
- The addition of the
using containerdeclaration is a minor change that does not affect the functionality.- The modification to the
timeoutcalculation is a simplification that reduces dependency on a utility function, which is a positive change.
145-145: LGTM!The change is approved:
- The addition of the
using wrapperdeclaration is a minor change that does not affect the functionality.src/tests/rpp/test_scheduler.cpp (2)
356-356: LGTM!The code change is approved. The removal of the
get_disposable()method call is consistent with the simplification mentioned in the summary.
356-356: LGTM!The code change is approved. Storing the worker in an
std::optionalallows better control over its lifecycle within the test.
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- src/rpp/rpp/schedulers/test_scheduler.hpp (5 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/rpp/rpp/schedulers/test_scheduler.hpp
BENCHMARK RESULTS (AUTOGENERATED)
|
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 303.19 ns | 2.16 ns | 2.18 ns | 0.99 |
| Subscribe empty callbacks to empty observable via pipe operator | 301.05 ns | 2.16 ns | 2.16 ns | 1.00 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 689.80 ns | 0.31 ns | 0.31 ns | 1.00 |
| from array of 1 - create + subscribe + current_thread | 1028.48 ns | 3.71 ns | 3.70 ns | 1.00 |
| concat_as_source of just(1 immediate) create + subscribe | 2255.75 ns | 106.62 ns | 100.56 ns | 1.06 |
| defer from array of 1 - defer + create + subscribe + immediate | 735.64 ns | 0.31 ns | 0.31 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 2120.28 ns | 59.23 ns | 59.23 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 3024.58 ns | 32.42 ns | 32.43 ns | 1.00 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 29992.05 ns | 31461.77 ns | 28222.53 ns | 1.11 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 38065.93 ns | 54777.28 ns | 49211.27 ns | 1.11 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3549.63 ns | 130.66 ns | 121.00 ns | 1.08 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1088.49 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 832.91 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 995.24 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 851.14 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+first()+subscribe | 1241.92 ns | 0.31 ns | 0.62 ns | 0.50 |
| immediate_just(1,2)+last()+subscribe | 949.18 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 1106.07 ns | 17.91 ns | 17.91 ns | 1.00 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 839.03 ns | 0.31 ns | 0.31 ns | 0.99 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 281.61 ns | 2.16 ns | 2.16 ns | 1.00 |
| current_thread scheduler create worker + schedule | 370.54 ns | 5.87 ns | 5.56 ns | 1.06 |
| current_thread scheduler create worker + schedule + recursive schedule | 825.73 ns | 55.98 ns | 56.68 ns | 0.99 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 871.55 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 891.09 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2328.11 ns | 199.42 ns | 167.56 ns | 1.19 |
| immediate_just+buffer(2)+subscribe | 1541.50 ns | 13.60 ns | 13.58 ns | 1.00 |
| immediate_just+window(2)+subscribe + subscsribe inner | 2379.53 ns | 1129.99 ns | 1102.05 ns | 1.03 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 849.86 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 879.81 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 1976.71 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3403.31 ns | 191.32 ns | 174.45 ns | 1.10 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3660.26 ns | 169.38 ns | 175.66 ns | 0.96 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 136.07 ns | 141.55 ns | 0.96 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3555.65 ns | 1097.74 ns | 944.83 ns | 1.16 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2113.58 ns | 202.87 ns | 203.09 ns | 1.00 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 34.48 ns | 30.55 ns | 14.59 ns | 2.09 |
| subscribe 100 observers to publish_subject | 199005.60 ns | 15336.68 ns | 15502.24 ns | 0.99 |
| 100 on_next to 100 observers to publish_subject | 27048.68 ns | 17381.58 ns | 20231.98 ns | 0.86 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1399.33 ns | 12.97 ns | 13.89 ns | 0.93 |
| basic sample with immediate scheduler | 1380.75 ns | 5.56 ns | 5.55 ns | 1.00 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 940.05 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2072.37 ns | 1014.49 ns | 993.00 ns | 1.02 |
| create(on_error())+retry(1)+subscribe | 603.39 ns | 121.52 ns | 108.14 ns | 1.12 |
ci-macos
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 1050.84 ns | 4.21 ns | 6.49 ns | 0.65 |
| Subscribe empty callbacks to empty observable via pipe operator | 1050.80 ns | 4.24 ns | 5.23 ns | 0.81 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 2077.37 ns | 0.25 ns | 0.30 ns | 0.85 |
| from array of 1 - create + subscribe + current_thread | 2611.04 ns | 35.44 ns | 40.86 ns | 0.87 |
| concat_as_source of just(1 immediate) create + subscribe | 5917.38 ns | 363.90 ns | 424.07 ns | 0.86 |
| defer from array of 1 - defer + create + subscribe + immediate | 2948.97 ns | 0.26 ns | 0.35 ns | 0.73 |
| interval - interval + take(3) + subscribe + immediate | 7142.75 ns | 166.39 ns | 145.53 ns | 1.14 |
| interval - interval + take(3) + subscribe + current_thread | 8745.38 ns | 127.49 ns | 135.92 ns | 0.94 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 99760.36 ns | 109249.10 ns | 121325.50 ns | 0.90 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 105983.00 ns | 105731.36 ns | 120692.62 ns | 0.88 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 12480.29 ns | 573.31 ns | 500.59 ns | 1.15 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 3069.13 ns | 0.27 ns | 0.36 ns | 0.75 |
| immediate_just+filter(true)+subscribe | 2259.86 ns | 0.26 ns | 0.65 ns | 0.40 |
| immediate_just(1,2)+skip(1)+subscribe | 3015.09 ns | 0.25 ns | 0.30 ns | 0.85 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 2259.50 ns | 0.52 ns | 0.75 ns | 0.68 |
| immediate_just(1,2)+first()+subscribe | 3414.01 ns | 0.25 ns | 0.32 ns | 0.78 |
| immediate_just(1,2)+last()+subscribe | 2545.06 ns | 0.25 ns | 0.30 ns | 0.85 |
| immediate_just+take_last(1)+subscribe | 6093.57 ns | 0.25 ns | 0.35 ns | 0.73 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 2276.16 ns | 0.26 ns | 0.37 ns | 0.70 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 1478.22 ns | 6.98 ns | 8.42 ns | 0.83 |
| current_thread scheduler create worker + schedule | 1802.78 ns | 44.79 ns | 47.03 ns | 0.95 |
| current_thread scheduler create worker + schedule + recursive schedule | 2258.30 ns | 255.45 ns | 262.47 ns | 0.97 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 3136.93 ns | 5.13 ns | 6.19 ns | 0.83 |
| immediate_just+scan(10, std::plus)+subscribe | 4669.96 ns | 0.75 ns | 1.07 ns | 0.70 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 7903.31 ns | 449.15 ns | 532.14 ns | 0.84 |
| immediate_just+buffer(2)+subscribe | 2667.13 ns | 75.59 ns | 86.57 ns | 0.87 |
| immediate_just+window(2)+subscribe + subscsribe inner | 5743.67 ns | 2567.36 ns | 3116.47 ns | 0.82 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 2276.81 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 2988.50 ns | 0.25 ns | 0.61 ns | 0.42 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 5260.80 ns | 5.52 ns | 7.29 ns | 0.76 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 8358.50 ns | 693.41 ns | 805.41 ns | 0.86 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 15709.03 ns | 491.69 ns | 568.97 ns | 0.86 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 518.32 ns | 847.87 ns | 0.61 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 12114.58 ns | 2438.18 ns | 3418.18 ns | 0.71 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 7167.78 ns | 1367.08 ns | 1657.85 ns | 0.82 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 91.06 ns | 53.51 ns | 72.69 ns | 0.74 |
| subscribe 100 observers to publish_subject | 527663.50 ns | 80120.10 ns | 58600.17 ns | 1.37 |
| 100 on_next to 100 observers to publish_subject | 62100.06 ns | 23358.97 ns | 22818.83 ns | 1.02 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 4267.88 ns | 69.52 ns | 111.35 ns | 0.62 |
| basic sample with immediate scheduler | 4152.01 ns | 22.63 ns | 32.82 ns | 0.69 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 2546.85 ns | 0.25 ns | 0.62 ns | 0.41 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 7061.90 ns | 4485.06 ns | 9318.59 ns | 0.48 |
| create(on_error())+retry(1)+subscribe | 2016.32 ns | 318.34 ns | 364.62 ns | 0.87 |
ci-ubuntu-clang
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 272.68 ns | 1.57 ns | 1.56 ns | 1.01 |
| Subscribe empty callbacks to empty observable via pipe operator | 266.21 ns | 1.57 ns | 1.54 ns | 1.02 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 570.32 ns | 0.37 ns | 0.31 ns | 1.19 |
| from array of 1 - create + subscribe + current_thread | 784.45 ns | 4.32 ns | 4.36 ns | 0.99 |
| concat_as_source of just(1 immediate) create + subscribe | 2450.46 ns | 131.30 ns | 130.25 ns | 1.01 |
| defer from array of 1 - defer + create + subscribe + immediate | 766.90 ns | 0.31 ns | 0.31 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 2226.40 ns | 58.26 ns | 58.30 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 3185.79 ns | 30.86 ns | 30.88 ns | 1.00 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 30134.06 ns | 31016.22 ns | 28176.45 ns | 1.10 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 36879.52 ns | 36415.90 ns | 37333.68 ns | 0.98 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3692.40 ns | 151.68 ns | 149.48 ns | 1.01 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1132.66 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 841.14 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 1080.01 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 862.81 ns | 0.31 ns | 0.62 ns | 0.50 |
| immediate_just(1,2)+first()+subscribe | 1360.76 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 1016.39 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 1190.21 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 860.14 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 280.28 ns | 1.57 ns | 1.54 ns | 1.02 |
| current_thread scheduler create worker + schedule | 390.28 ns | 4.94 ns | 4.78 ns | 1.03 |
| current_thread scheduler create worker + schedule + recursive schedule | 863.15 ns | 60.15 ns | 56.33 ns | 1.07 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 838.06 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 960.50 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2287.60 ns | 138.51 ns | 137.64 ns | 1.01 |
| immediate_just+buffer(2)+subscribe | 1511.67 ns | 13.58 ns | 13.58 ns | 1.00 |
| immediate_just+window(2)+subscribe + subscsribe inner | 2448.68 ns | 936.37 ns | 968.03 ns | 0.97 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 842.43 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 845.03 ns | 0.31 ns | 0.31 ns | 1.01 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 2028.05 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3260.57 ns | 159.96 ns | 158.44 ns | 1.01 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3757.00 ns | 148.09 ns | 146.42 ns | 1.01 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 143.70 ns | 145.20 ns | 0.99 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3423.35 ns | 837.94 ns | 865.50 ns | 0.97 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2223.17 ns | 202.94 ns | 200.26 ns | 1.01 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 53.96 ns | 17.82 ns | 17.68 ns | 1.01 |
| subscribe 100 observers to publish_subject | 212269.80 ns | 15959.44 ns | 16092.61 ns | 0.99 |
| 100 on_next to 100 observers to publish_subject | 42928.79 ns | 17380.35 ns | 20491.95 ns | 0.85 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1294.35 ns | 11.73 ns | 12.04 ns | 0.97 |
| basic sample with immediate scheduler | 1309.13 ns | 6.17 ns | 5.86 ns | 1.05 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 999.46 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2166.67 ns | 1245.85 ns | 1242.86 ns | 1.00 |
| create(on_error())+retry(1)+subscribe | 660.29 ns | 138.60 ns | 138.63 ns | 1.00 |
ci-windows
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 575.05 ns | 4.01 ns | 4.32 ns | 0.93 |
| Subscribe empty callbacks to empty observable via pipe operator | 585.87 ns | 4.01 ns | 4.32 ns | 0.93 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 1152.38 ns | 9.71 ns | 9.64 ns | 1.01 |
| from array of 1 - create + subscribe + current_thread | 1424.36 ns | 17.99 ns | 17.90 ns | 1.01 |
| concat_as_source of just(1 immediate) create + subscribe | 3801.62 ns | 174.74 ns | 188.89 ns | 0.93 |
| defer from array of 1 - defer + create + subscribe + immediate | 1193.42 ns | 9.40 ns | 9.41 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 3390.80 ns | 145.36 ns | 145.23 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 3431.67 ns | 66.50 ns | 64.77 ns | 1.03 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 120400.00 ns | 121777.78 ns | 114800.00 ns | 1.06 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 129812.50 ns | 128022.22 ns | 130937.50 ns | 0.98 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 5385.13 ns | 211.36 ns | 212.61 ns | 0.99 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1825.35 ns | 25.28 ns | 25.29 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 1326.03 ns | 24.36 ns | 24.36 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 1741.15 ns | 24.12 ns | 24.07 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 1375.63 ns | 29.05 ns | 28.99 ns | 1.00 |
| immediate_just(1,2)+first()+subscribe | 2080.00 ns | 22.83 ns | 22.82 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 1788.98 ns | 24.30 ns | 24.06 ns | 1.01 |
| immediate_just+take_last(1)+subscribe | 2019.82 ns | 70.16 ns | 69.59 ns | 1.01 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 1336.54 ns | 27.62 ns | 27.44 ns | 1.01 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 482.25 ns | 6.79 ns | 6.17 ns | 1.10 |
| current_thread scheduler create worker + schedule | 651.97 ns | 13.80 ns | 14.08 ns | 0.98 |
| current_thread scheduler create worker + schedule + recursive schedule | 1355.57 ns | 109.24 ns | 105.75 ns | 1.03 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 1335.78 ns | 24.42 ns | 24.37 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 1447.57 ns | 26.82 ns | 26.82 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 3478.03 ns | 213.28 ns | 206.57 ns | 1.03 |
| immediate_just+buffer(2)+subscribe | 2647.73 ns | 68.71 ns | 68.49 ns | 1.00 |
| immediate_just+window(2)+subscribe + subscsribe inner | 4041.20 ns | 1330.98 ns | 1318.74 ns | 1.01 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 1626.90 ns | 23.12 ns | 23.12 ns | 1.00 |
| immediate_just+take_while(true)+subscribe | 1332.32 ns | 24.36 ns | 24.36 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 3517.52 ns | 11.10 ns | 11.10 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 5243.88 ns | 226.76 ns | 222.77 ns | 1.02 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 5467.61 ns | 221.15 ns | 208.87 ns | 1.06 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 195.68 ns | 193.87 ns | 1.01 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 5435.71 ns | 954.56 ns | 936.70 ns | 1.02 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 3588.86 ns | 522.73 ns | 506.33 ns | 1.03 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 36.56 ns | 19.83 ns | 20.35 ns | 0.97 |
| subscribe 100 observers to publish_subject | 263325.00 ns | 29324.32 ns | 30703.03 ns | 0.96 |
| 100 on_next to 100 observers to publish_subject | 55015.79 ns | 32661.76 ns | 38783.33 ns | 0.84 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1904.79 ns | 101.52 ns | 101.40 ns | 1.00 |
| basic sample with immediate scheduler | 1906.27 ns | 73.73 ns | 74.57 ns | 0.99 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 1468.82 ns | 24.97 ns | 24.97 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2182.66 ns | 349.61 ns | 350.21 ns | 1.00 |
| create(on_error())+retry(1)+subscribe | 1176.63 ns | 141.68 ns | 143.31 ns | 0.99 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## v2 #632 +/- ##
==========================================
+ Coverage 95.62% 95.65% +0.03%
==========================================
Files 98 98
Lines 1897 1865 -32
==========================================
- Hits 1814 1784 -30
+ Misses 83 81 -2 ☔ View full report in Codecov by Sentry. |
|


Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests